CloudTrailログからAmazon Athenaを使ってログイン監査レポートを作成する
西澤です。CloudTrailって本当に素晴らしい機能なのですが、ログ出力の形式がJSONなので、そのままだと扱いづらかったりします。これに対する対処法については、まず下記の素晴らしいブログをご覧ください。
で、CloudTrailログに対するクエリをいざやってみようと思ったのですが、HadoopもPrestoも触ったことがなかった私にはまだちょっと難しい。今回こちらに記録しておくことで、自分が再利用できるようにしておきたいと考えました。
前提
本記事では、下記を前提とさせていただきますので、ご不明点がある方は事前にご確認ください。
- CloudTrailログがS3バケットに出力されるよう設定されている
- Ahtenaのクエリ性能や課金に関しては考慮しない、なので、Athenaパーティションについては考えない
CloudTrailログにAthenaからクエリする
CloudTrailログのフォーマットを確認して出力したい項目を決める
まずは、CloudTrailログのフォーマットを確認しておきましょう。下記ハイライトされた項目のみを出力しようと考えました。
{ "Records": [{ "eventVersion": "1.01", "userIdentity": { "type": "IAMUser", "principalId": "AIDAJDPLRKLG7UEXAMPLE", "arn": "arn:aws:iam::123456789012:user/Alice", "accountId": "123456789012", "accessKeyId": "AKIAIOSFODNN7EXAMPLE", "userName": "Alice", "sessionContext": { "attributes": { "mfaAuthenticated": "false", "creationDate": "2014-03-18T14:29:23Z" } } }, "eventTime": "2014-03-18T14:30:07Z", "eventSource": "cloudtrail.amazonaws.com", "eventName": "StartLogging", "awsRegion": "us-west-2", "sourceIPAddress": "72.21.198.64", "userAgent": "signin.amazonaws.com", "requestParameters": { "name": "Default" }, "responseElements": null, "requestID": "cdc73f9d-aea9-11e3-9d5a-835b769c0d9c", "eventID": "3074414d-c626-42aa-984b-68ff152d6ab7" }, ... additional entries ... ]
Athenaでテーブルを作成する
records以下がARRAY
、userIdentity以下が1つ階層が深いのでSTRUCT
になるところがポイントだと思います。LOCATIONに指定するS3のフォルダ配下の全てがスキャン範囲(課金範囲)になるのでご注意ください。
CREATE EXTERNAL TABLE IF NOT EXISTS records_201702 ( records ARRAY< STRUCT< eventTime:STRING, eventSource:STRING, eventName:STRING, awsRegion:STRING, sourceIPAddress:STRING, userIdentity:STRUCT< type:STRING, arn:STRING > > > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://[CloudTrailログ出力バケット名]/[prefix指定があれば]/AWSLogs/[AWSアカウントID]/CloudTrail/[Region]/2017/02';
無事にテーブル作成に成功しました。
JSONにクエリを投げる
まずは、そのままクエリを実行します。
SELECT * FROM records_201702 LIMIT 10;
このままだとrecords以下の配列がそのまま出てきます。結局S3にあるファイル1つ分が1レコードになった状態です。
次にrecords配下の配列を分解して別々のレコードにしてあげます。
SELECT record FROM records_201702 CROSS JOIN UNNEST(records) AS t (record) LIMIT 10;
records配下が配列ではなくなり、分解されたレコードになりました。
今度はカラムをそれぞれ表示させるようにします。階層が深いものは"."を付けて表現するだけでOKです。
SELECT record.eventTime, record.eventSource, record.eventName, record.awsRegion, record.sourceIPAddress, record.userIdentity.type, record.userIdentity.arn FROM records_201702 CROSS JOIN UNNEST(records) AS t (record) LIMIT 10;
ここまで来ればもうJSONは忘れて大丈夫ですね。
もう大丈夫かと思いますが、ConsoleLogin
だけで絞り込んでみましょう。
SELECT record.eventTime, record.eventSource, record.eventName, record.awsRegion, record.sourceIPAddress, record.userIdentity.type, record.userIdentity.arn FROM records_201702 CROSS JOIN UNNEST(records) AS t (record) WHERE record.eventName = 'ConsoleLogin' LIMIT 10;
ここまで来れば、もうWHERE句で自由にフィルタできると思います。ログイン監査レポートのできあがりです。
まとめ
CREATE TABLE文と最後に紹介したSELECT文だけコピペしてもらえれば、10分もかけずに実行結果が得られるはずです。Athenaを試してみたことが無い方にもぜひ試してみていただきたいです。調査の度にJSONと格闘するのもなかなか辛いと思いますので、やってみたブログをぜひご活用いただければと思います。
どこかの誰かのお役に立てば嬉しいです。